package com.atguigu.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.HBaseUtil;
import com.atguigu.utils.JedisUtil;
import io.lettuce.core.api.StatefulRedisConnection;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.client.AsyncConnection;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> {

    private AsyncConnection asyncConnection;
    private StatefulRedisConnection<String, String> redisConnection;
    private String tableName;

    public DimAsyncFunction(String tableName) {
        this.tableName = tableName;
    }

    public abstract String getKey(T input);

    public abstract void join(T input, JSONObject dimInfo);

    @Override
    public void open(Configuration parameters) throws Exception {
        asyncConnection = HBaseUtil.getAsyncConnection();
        redisConnection = JedisUtil.getAsyncRedisConnection();
    }

    @Override
    public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {

        String key = getKey(input);

        CompletableFuture.<JSONObject>supplyAsync(
                new Supplier<JSONObject>() {
                    @Override
                    public JSONObject get() {
                        try {
                            return DimInfoFunction.getRedisDimInfo(redisConnection, tableName, key);
                        } catch (Exception e) {
                            return null;
                        }
                    }
                }
        ).thenApplyAsync(new Function<JSONObject, JSONObject>() {
            @Override
            public JSONObject apply(JSONObject jsonObject) {
                if (jsonObject == null) {
                    //从HBase查询数据
                    JSONObject data = null;
                    try {
                        data = HBaseUtil.getData(asyncConnection, tableName, key);
                    } catch (Exception e) {
                        return null;
                    }
                    DimInfoFunction.setRedisDimInfo(redisConnection, tableName, key, data.toJSONString());
                    return data;
                } else {
                    return jsonObject;
                }
            }
        }).thenAccept(new Consumer<JSONObject>() {
            @Override
            public void accept(JSONObject jsonObject) {
                if (jsonObject != null) {
                    join(input, jsonObject);
                }

                resultFuture.complete(Collections.singletonList(input));
            }
        });

    }

    @Override
    public void timeout(T input, ResultFuture<T> resultFuture) throws Exception {
        System.out.println("TimeOut>>>>>>>>>>" + input);
    }

    @Override
    public void close() throws Exception {
        redisConnection.close();
        asyncConnection.close();
    }
}
